package cn.jingyuan.bee.utils.thread;

import java.lang.Thread.UncaughtExceptionHandler;
import java.util.concurrent.*;

/**
 * 线程池工具
 */
public class ThreadUtils {

    /**
     * 新建一个线程池
     *
     * @param threadSize 同时执行的线程数大小
     *
     * @return ExecutorService
     */
    public static ExecutorService newExecutor(int threadSize) {
        ExecutorBuilder builder = ExecutorBuilder.create();
        if (threadSize > 0) {
            builder.setCorePoolSize(threadSize);
        }
        return builder.build();
    }

    /**
     * 获得一个新的线程池
     *
     * @return ExecutorService
     */
    public static ExecutorService newExecutor() {
        return ExecutorBuilder.create().useSynchronousQueue().build();
    }

    /**
     * 获得一个新的线程池，只有单个线程
     *
     * @return ExecutorService
     */
    public static ExecutorService newSingleExecutor() {
        return Executors.newSingleThreadExecutor();
    }

    /**
     * 获得一个新的线程池<br>
     * 如果 maximumPoolSize =》 corePoolSize，在没有新任务加入的情况下，多出的线程将最多保留 60s
     *
     * @param corePoolSize 初始线程池大小
     * @param maximumPoolSize 最大线程池大小
     *
     * @return {@link ThreadPoolExecutor}
     */
    public static ThreadPoolExecutor newExecutor(int corePoolSize, int maximumPoolSize) {
        return ExecutorBuilder.create().setCorePoolSize(corePoolSize).setMaxPoolSize(maximumPoolSize).build();
    }

    /**
     * 获得一个新的线程池<br>
     * 传入阻塞系数，线程池的大小计算公式为：CPU 可用核心数 / (1 - 阻塞因子)<br>
     * Blocking Coefficient(阻塞系数) = 阻塞时间／（阻塞时间+使用 CPU 的时间）<br>
     * 计算密集型任务的阻塞系数为 0，而 IO 密集型任务的阻塞系数则接近于 1。
     * <p>
     * see: http://blog.csdn.net/partner4java/article/details/9417663
     *
     * @param blockingCoefficient 阻塞系数，阻塞因子介于 0~1 之间的数，阻塞因子越大，线程池中的线程数越多。
     *
     * @return {@link ThreadPoolExecutor}
     */
    public static ThreadPoolExecutor newExecutorByBlockingCoefficient(float blockingCoefficient) {
        if (blockingCoefficient >= 1 || blockingCoefficient < 0) {
            throw new IllegalArgumentException("[blockingCoefficient] must between 0 and 1, or equals 0.");
        }

        // 最佳的线程数 = CPU 可用核心数 / (1 - 阻塞系数)
        int poolSize = (int) (Runtime.getRuntime().availableProcessors() / (1 - blockingCoefficient));
        return ExecutorBuilder.create().setCorePoolSize(poolSize).setMaxPoolSize(poolSize).setKeepAliveTime(0L).build();
    }

    /**
     * 直接在公共线程池中执行线程
     *
     * @param runnable 可运行对象
     */
    public static void execute(Runnable runnable) {
        GlobalThreadPool.execute(runnable);
    }

    /**
     * 执行异步方法
     *
     * @param runnable 需要执行的方法体
     * @param isDaemon 是否守护线程。守护线程会在主线程结束后自动结束
     *
     * @return 执行的方法体
     */
    public static Runnable execAsync(final Runnable runnable, boolean isDaemon) {
        Thread thread = new Thread(runnable);
        thread.setDaemon(isDaemon);
        thread.start();

        return runnable;
    }

    /**
     * 执行有返回值的异步方法<br>
     * Future 代表一个异步执行的操作，通过 get()方法可以获得操作的结果，如果异步操作还没有完成，则，get()会使当前线程阻塞
     *
     * @param <T> 回调对象类型
     * @param task {@link Callable}
     *
     * @return Future
     */
    public static <T> Future<T> execAsync(Callable<T> task) {
        return GlobalThreadPool.submit(task);
    }

    /**
     * 执行有返回值的异步方法<br>
     * Future 代表一个异步执行的操作，通过 get() 方法可以获得操作的结果，如果异步操作还没有完成，则，get() 会使当前线程阻塞
     *
     * @param runnable 可运行对象
     *
     * @return {@link Future}
     */
    public static Future<?> execAsync(Runnable runnable) {
        return GlobalThreadPool.submit(runnable);
    }

    /**
     * 新建一个 CompletionService，调用其 submit 方法可以异步执行多个任务，最后调用 take 方法按照完成的顺序获得其结果。<br>
     * 若未完成，则会阻塞
     *
     * @param <T> 回调对象类型
     *
     * @return CompletionService
     */
    public static <T> CompletionService<T> newCompletionService() {
        return new ExecutorCompletionService<>(GlobalThreadPool.getExecutor());
    }

    /**
     * 新建一个 CompletionService，调用其 submit 方法可以异步执行多个任务，最后调用 take 方法按照完成的顺序获得其结果。<br>
     * 若未完成，则会阻塞
     *
     * @param <T> 回调对象类型
     * @param executor 执行器 {@link ExecutorService}
     *
     * @return CompletionService
     */
    public static <T> CompletionService<T> newCompletionService(ExecutorService executor) {
        return new ExecutorCompletionService<>(executor);
    }

    /**
     * 新建一个 CountDownLatch，一个同步辅助类，在完成一组正在其他线程中执行的操作之前，它允许一个或多个线程一直等待。
     *
     * @param threadCount 线程数量
     *
     * @return CountDownLatch
     */
    public static CountDownLatch newCountDownLatch(int threadCount) {
        return new CountDownLatch(threadCount);
    }

    /**
     * 创建新线程，非守护线程，正常优先级，线程组与当前线程的线程组一致
     *
     * @param runnable {@link Runnable}
     * @param name 线程名
     *
     * @return {@link Thread}
     */
    public static Thread newThread(Runnable runnable, String name) {
        final Thread t = newThread(runnable, name, false);
        if (t.getPriority() != Thread.NORM_PRIORITY) {
            t.setPriority(Thread.NORM_PRIORITY);
        }
        return t;
    }

    /**
     * 创建新线程
     *
     * @param runnable {@link Runnable}
     * @param name 线程名
     * @param isDaemon 是否守护线程
     *
     * @return {@link Thread}
     */
    public static Thread newThread(Runnable runnable, String name, boolean isDaemon) {
        final Thread t = new Thread(null, runnable, name);
        t.setDaemon(isDaemon);
        return t;
    }

    /**
     * 挂起当前线程
     *
     * @param timeout 挂起的时长
     * @param timeUnit 时长单位
     *
     * @return 被中断返回 false，否则 true
     */
    public static boolean sleep(Number timeout, TimeUnit timeUnit) {
        try {
            timeUnit.sleep(timeout.longValue());
        } catch (InterruptedException e) {
            return false;
        }
        return true;
    }

    /**
     * 挂起当前线程
     *
     * @param millis 挂起的毫秒数
     *
     * @return 被中断返回 false，否则 true
     */
    public static boolean sleep(Number millis) {
        if (millis == null) {
            return true;
        }

        try {
            Thread.sleep(millis.longValue());
        } catch (InterruptedException e) {
            return false;
        }
        return true;
    }

    /**
     * 考虑 {@link Thread#sleep(long)} 方法有可能时间不足给定毫秒数，此方法保证 sleep 时间不小于给定的毫秒数
     *
     * @param millis 给定的 sleep 时间
     *
     * @return 被中断返回 false，否则 true
     *
     * @see ThreadUtils#sleep(Number)
     */
    public static boolean safeSleep(Number millis) {
        long millisLong = millis.longValue();
        long done = 0;
        while (done < millisLong) {
            long before = System.currentTimeMillis();
            if (!sleep(millisLong - done)) {
                return false;
            }
            long after = System.currentTimeMillis();
            done += (after - before);
        }
        return true;
    }

    /**
     * @return 获得堆栈列表
     */
    public static StackTraceElement[] getStackTrace() {
        return Thread.currentThread().getStackTrace();
    }

    /**
     * 获得堆栈项
     *
     * @param i 第几个堆栈项
     *
     * @return 堆栈项
     */
    public static StackTraceElement getStackTraceElement(int i) {
        StackTraceElement[] stackTrace = getStackTrace();
        if (i < 0) {
            i += stackTrace.length;
        }
        return stackTrace[i];
    }

    /**
     * 创建本地线程对象
     *
     * @param <T> 持有对象类型
     * @param isInheritable 是否为子线程提供从父线程那里继承的值
     *
     * @return 本地线程
     */
    public static <T> ThreadLocal<T> createThreadLocal(boolean isInheritable) {
        if (isInheritable) {
            return new InheritableThreadLocal<>();
        } else {
            return new ThreadLocal<>();
        }
    }

    /**
     * 创建 ThreadFactoryBuilder
     *
     * @return ThreadFactoryBuilder
     *
     * @see ThreadFactoryBuilder#build()
     */
    public static ThreadFactoryBuilder createThreadFactoryBuilder() {
        return ThreadFactoryBuilder.create();
    }

    /**
     * 结束线程，调用此方法后，线程将抛出 {@link InterruptedException} 异常
     *
     * @param thread 线程
     * @param isJoin 是否等待结束
     */
    public static void interrupt(Thread thread, boolean isJoin) {
        if (null != thread && !thread.isInterrupted()) {
            thread.interrupt();
            if (isJoin) {
                waitForDie(thread);
            }
        }
    }

    /**
     * 等待线程结束. 调用 {@link Thread#join()} 并忽略 {@link InterruptedException}
     *
     * @param thread 线程
     */
    public static void waitForDie(Thread thread) {
        if (null == thread) {
            return;
        }

        boolean dead = false;
        do {
            try {
                thread.join();
                dead = true;
            } catch (InterruptedException e) {
                // ignore
            }
        } while (!dead);
    }

    /**
     * 获取 JVM 中与当前线程同组的所有线程<br>
     *
     * @return 线程对象数组
     */
    public static Thread[] getThreads() {
        return getThreads(Thread.currentThread().getThreadGroup().getParent());
    }

    /**
     * 获取 JVM 中与当前线程同组的所有线程<br>
     * 使用数组二次拷贝方式，防止在线程列表获取过程中线程终止<br>
     * from Voovan
     *
     * @param group 线程组
     *
     * @return 线程对象数组
     */
    public static Thread[] getThreads(ThreadGroup group) {
        final Thread[] slackList = new Thread[group.activeCount() * 2];
        final int actualSize = group.enumerate(slackList);
        final Thread[] result = new Thread[actualSize];
        System.arraycopy(slackList, 0, result, 0, actualSize);
        return result;
    }

    /**
     * 获取进程的主线程<br>
     * from Voovan
     *
     * @return 进程的主线程
     */
    public static Thread getMainThread() {
        for (Thread thread : getThreads()) {
            if (thread.getId() == 1) {
                return thread;
            }
        }
        return null;
    }

    /**
     * 获取当前线程的线程组
     *
     * @return 线程组
     */
    public static ThreadGroup currentThreadGroup() {
        final SecurityManager s = System.getSecurityManager();
        return (null != s) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
    }

    /**
     * 创建线程工厂
     *
     * @param prefix 线程名前缀
     * @param isDaemon 是否守护线程
     *
     * @return {@link ThreadFactory}
     */
    public static ThreadFactory newNamedThreadFactory(String prefix, boolean isDaemon) {
        return new NamedThreadFactory(prefix, isDaemon);
    }

    /**
     * 创建线程工厂
     *
     * @param prefix 线程名前缀
     * @param threadGroup 线程组，可以为 null
     * @param isDaemon 是否守护线程
     *
     * @return {@link ThreadFactory}
     */
    public static ThreadFactory newNamedThreadFactory(String prefix, ThreadGroup threadGroup, boolean isDaemon) {
        return new NamedThreadFactory(prefix, threadGroup, isDaemon);
    }

    /**
     * 创建线程工厂
     *
     * @param prefix 线程名前缀
     * @param threadGroup 线程组，可以为 null
     * @param isDaemon 是否守护线程
     * @param handler 未捕获异常处理
     *
     * @return {@link ThreadFactory}
     */
    public static ThreadFactory newNamedThreadFactory(String prefix, ThreadGroup threadGroup, boolean isDaemon, UncaughtExceptionHandler handler) {
        return new NamedThreadFactory(prefix, threadGroup, isDaemon, handler);
    }

    /**
     * 阻塞当前线程，保证在 main 方法中执行不被退出
     *
     * @param obj 对象所在线程
     */

    public static void sync(Object obj) {
        synchronized (obj) {
            try {
                obj.wait();
            } catch (InterruptedException e) {
                // ignore
            }
        }
    }

    /**
     * 并发测试<br>
     * 此方法用于测试多线程下执行某些逻辑的并发性能<br>
     * 调用此方法会导致当前线程阻塞。<br>
     * 结束后可调用 {@link ConcurrencyTester#getInterval()} 方法获取执行时间
     *
     * @param threadSize 并发线程数
     * @param runnable 执行的逻辑实现
     *
     * @return {@link ConcurrencyTester}
     */
    public static ConcurrencyTester concurrencyTest(int threadSize, Runnable runnable) {
        return (new ConcurrencyTester(threadSize)).test(runnable);
    }

}
